from pyspark import *
import os
if __name__ == '__main__':

    conf = SparkConf().setAppName("1_mapValues").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd1 = sc.parallelize([("zadoop", 1), ("java", 2), ("python", 3), ("python", 10)])
    rdd2 = sc.parallelize([("zadoop", 1), ("zadoop", 2), ("zadoop", 4), ("zadoop", 3)])
    rdd3 = sc.parallelize([("zadoop", 2), ("java", 2), ("python", 3), ("python", 10)])
    rdd4 = sc.parallelize([1,2,3,4,5,6,7])
    print(rdd1.takeOrdered(3))
    print(rdd2.takeOrdered(3))
    print(rdd1.takeOrdered(3, lambda t: -t[1]))
    print(rdd3.takeOrdered(3, lambda t: -t[1]))

    rdd1.foreach(lambda t:print(t[0]+'@'+str(t[1])))
    def foreachPartitonFunc(itr)->None:
        for x in itr:
            print(x)
    rdd1.foreachPartition(foreachPartitonFunc)
    def mapPartitionsFunc(itr)->list:
        list1=list()
        for x in itr:
            list1.append(x)
        return list1
    print(rdd1.mapPartitions(mapPartitionsFunc).collect())
    def partitionByFunc(k)->int:
        if k== 'zadoop':
            return 0
        elif k== 'java':
            return 1
        else:
            return 2
    # 针对key-value型rdd
    print(rdd1.partitionBy(3, partitionByFunc).glom().collect())
    print(rdd1.getNumPartitions())
    print(rdd4.partitionBy(3,))
    # 怎加分区极大可能shuffle
    print(rdd1.repartition(3).getNumPartitions())
    print(rdd1.repartition(10).getNumPartitions())
    print(rdd1.coalesce(3).getNumPartitions())
    print(rdd1.coalesce(10,True).getNumPartitions())

    # rdd1.saveAsTextFile("../output")
    local_path="file://"+os.path.dirname(os.getcwd())+'/data/output1'
    print(local_path)
    rdd1.saveAsTextFile(local_path)
    sc.stop()








